package netService

import (
	"fmt"
	"gitee.com/ling-bin/go-utils/pools"
	"gitee.com/ling-bin/network/netInterface"
	"hash/fnv"
	"log"
	"sync"
	"time"

	"net"
	"sync/atomic"
)

//UdpService 接口实现，定义一个Server服务类
type UdpService struct {
	connId             uint64                                                                                                    //连接ID累计
	connMgr            netInterface.IConnManager                                                                                 //当前Server的链接管理器
	listenMap          sync.Map                                                                                                  //监听对象
	onLogHandle        func(level netInterface.ErrLevel, msg ...interface{})                                                     //设置异常处理
	onConnStart        func(conn netInterface.IConnection)                                                                       //该Server的连接创建时Hook函数
	onConnStop         func(conn netInterface.IConnection)                                                                       //该Server的连接断开时的Hook函数
	onReceive          func(conn netInterface.IConnection, data []byte)                                                          //数据上传完成
	onReply            func(conn netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) //数据下发完成
	exitChan           chan bool                                                                                                 //告知该链接已经退出/停止的channel（管道）
	handleStrategy     func(conn netInterface.IConnection, data []byte) *netInterface.StrategyData                               //处理策略,TCP和UDP表现不一样，tcp 只会在连接上第一包数据时调用，udp 会在每一包数上次都调用
	bufferPool         *pools.BufferPool                                                                                         //缓存管理器
	connIdAddrMap      sync.Map                                                                                                  // 客户端连接地址映射
	config             *Config                                                                                                   //配置
	receiveHandler     pools.ITaskWorkerPool                                                                                     //当前Server的消息管理模块(工作池)
	replyHandle        pools.ITaskWorkerPool                                                                                     //消息发送处理器(工作池)
	receiveUdpTaskPool *receiveUdpTaskPool                                                                                       //udp数据接收任务池
	startTime          time.Time                                                                                                 //连接启动时间
	isStart            bool                                                                                                      //是否启动[true 启动，false 未启动]
	detectionTask      map[uint64]map[uint64]uint64                                                                              //连接缓存
	taskChan           chan netInterface.IConnection                                                                             //新增的任务
	lastKey            uint64
}

//NewUdpService 创建UDP服务
func NewUdpService(config *Config) netInterface.IService {
	uSer := &UdpService{
		config:             config,
		connMgr:            NewConnManager(),   //创建ConnManager
		exitChan:           make(chan bool, 1), //通知退出消息
		detectionTask:      make(map[uint64]map[uint64]uint64, 400),
		receiveHandler:     pools.NewTaskWorkerPool("数据接收处理器", config.ReceiveWorkerSize, config.ReceiveTaskQueueSize),
		replyHandle:        pools.NewTaskWorkerPool("数据回复处理器", config.ReplyWorkerSize, config.ReplyTaskQueueSize),
		receiveUdpTaskPool: newReceiveUdpTaskPool(uint(config.BufferSize)),
		taskChan:           make(chan netInterface.IConnection, 2000),
	}
	go uSer.recoveryConn()
	return uSer
}

//addDetection 添加监测
func (s *UdpService) addDetection(conn netInterface.IConnection) {
	s.taskChan <- conn
}

//获取时间桶
func (s *UdpService) getTimeBucket(heartTime time.Time) uint64  {
	return uint64(heartTime.Unix() / 4)
}

//清理
func (s *UdpService) recoveryConn() {
	timeKey := s.getTimeBucket(time.Now()) - 10
	for s.isStart {
		s.addTask() //添加任务
		currentTime := s.getTimeBucket(time.Now())
		if timeKey > currentTime {
			timeKey = currentTime - 10 //容错设计,如果时间是短时间回调不会出问题
		}
		if timeKey < currentTime {
			timeKey++                   //时间轮转
		}
		if timeKey == s.lastKey {
			continue
		}
		dMap, ok := s.detectionTask[timeKey]
		if !ok {
			s.lastKey = timeKey
			continue
		}
		//删除
		delete(s.detectionTask,timeKey)
		s.lastKey = timeKey
		for key, _ := range dMap {
			conn, ok := s.connMgr.Get(key)
			if ok {
				bucketId := s.getTimeBucket(conn.GetHeartTime().Add(s.config.KeepTime))
				if bucketId == timeKey {
					conn.Stop()
					//fmt.Println("断开:",conn.GetConnId())
				} else {
					mapItem, ok := s.detectionTask[bucketId]
					if !ok {
						mapItem = map[uint64]uint64{}
						s.detectionTask[bucketId] = mapItem
					}
					mapItem[conn.GetConnId()] = bucketId
					//fmt.Println("续期：",conn.GetConnId())
				}
			}
		}
	}
}

//添加任务
func (s *UdpService) addTask() {
	isOutTime := false
	for !isOutTime {
		select {
		case conn := <-s.taskChan:
			bucketId := s.getTimeBucket(conn.GetHeartTime().Add(s.config.KeepTime))
			if bucketId <= s.lastKey {
				conn.Stop()
				break
			}
			mapItem, ok := s.detectionTask[bucketId]
			if !ok {
				mapItem = map[uint64]uint64{}
				s.detectionTask[bucketId] = mapItem
			}
			mapItem[conn.GetConnId()] = bucketId
			//fmt.Println("添加任务")
			break
		default:
			time.Sleep(time.Millisecond * 5)
			isOutTime = true
			break
		}
	}
}

//GetConnMgr 得到链接管理
func (s *UdpService) GetConnMgr() netInterface.IConnManager {
	return s.connMgr
}

//GetConn 获取连接
func (s *UdpService) GetConn(connId uint64) (netInterface.IConnection, bool) {
	return s.GetConnMgr().Get(connId)
}

//GetStartTime 获取连接启动时间
func (s *UdpService) GetStartTime() time.Time {
	return s.startTime
}

//SetLogHandle 内部日志输出
func (s *UdpService) SetLogHandle(hookFunc func(level netInterface.ErrLevel, msg ...interface{})) {
	s.onLogHandle = hookFunc
}

//SetOnConnStart 设置该Server的连接创建时Hook函数
func (s *UdpService) SetOnConnStart(hookFunc func(netInterface.IConnection)) {
	s.onConnStart = hookFunc
}

//SetOnConnStop 设置该Server的连接断开时的Hook函数
func (s *UdpService) SetOnConnStop(hookFunc func(netInterface.IConnection)) {
	s.onConnStop = hookFunc
}

//SetOnReceive 数据上传完成处理函数[分包后]
func (s *UdpService) SetOnReceive(hookFunc func(netInterface.IConnection, []byte)) {
	s.onReceive = hookFunc
}

//SetOnReply  数据下发后回调
func (s *UdpService) SetOnReply(hookFunc func(netInterface.IConnection, []byte, bool, string, interface{}, error)) {
	s.onReply = hookFunc
}

//CallOnReply 下发后回调
func (s *UdpService) CallOnReply(conn netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) {
	if s.onReply != nil {
		defer func() {
			if r := recover(); r != nil {
				s.CallLogHandle(netInterface.Error, "[UDP]下发后回调业务逻辑异常:", r)
			}
		}()
		s.onReply(conn, data, isOk, cmdCode, param, err)
	}
}


//CallOnReceive 数据上传完成回调
func (s *UdpService) CallOnReceive(conn netInterface.IConnection, data []byte) {
	if s.onReceive != nil {
		s.onReceive(conn, data)
	}
}

//SetHandleStrategy 设置处理策略
func (s *UdpService) SetHandleStrategy(hookFunc func(netInterface.IConnection, []byte) *netInterface.StrategyData) {
	s.handleStrategy = hookFunc
}

//CallLogHandle 错误消息处理
func (s *UdpService) CallLogHandle(level netInterface.ErrLevel, msgAry ...interface{}) {
	if s.onLogHandle != nil {
		defer func() {
			if r := recover(); r != nil {
				log.Println("[UDP]CallLogHandle 错误消息处理调用业务逻辑异常:",r)
			}
		}()
		s.onLogHandle(level, msgAry)
	}
}

//CallOnConnStart 调用连接OnConnStart Hook函数
func (s *UdpService) CallOnConnStart(conn netInterface.IConnection) {
	if s.onConnStart != nil {
		defer func() {
			if r := recover(); r != nil {
				s.CallLogHandle(netInterface.Error, "[UDP]调用开始连接业务逻辑异常：", r)
			}
		}()
		s.onConnStart(conn)
	}
}

//CallOnConnStop 调用连接OnConnStop Hook函数
func (s *UdpService) CallOnConnStop(conn netInterface.IConnection) {
	property, err := conn.GetProperty(connKey)
	if err == nil {
		load, ok := s.connIdAddrMap.Load(property)
		if ok {
			odlConn := load.(netInterface.IConnection)
			if odlConn.GetConnId() == conn.GetConnId(){
				s.connIdAddrMap.Delete(property)
			}
		}
	}
	if s.onConnStop != nil {
		defer func() {
			if r := recover(); r != nil {
				s.CallLogHandle(netInterface.Error, "[UDP]调用断开连接业务逻辑异常：", r)
			}
		}()
		s.onConnStop(conn)
	}
}

//GetIsStart 获取是否启动[true 启动，false 未启动]
func (s *UdpService) GetIsStart() bool {
	return s.isStart
}

//Start 开启网络服务
func (s *UdpService) Start() {
	if s.isStart {
		return
	}
	s.isStart = true

	//消息处理工作池
	s.receiveHandler.StartWorkerPool(func(errString string) {
		s.CallLogHandle(netInterface.Error,fmt.Sprint("消息处理工作池：",errString))
	})
	//消息发送工作池
	s.replyHandle.StartWorkerPool(func(errString string) {
		s.CallLogHandle(netInterface.Error,fmt.Sprint("消息发送工作池：",errString))
	})
	s.startTime = time.Now()
	//开启监听
	var wg sync.WaitGroup
	wg.Add(len(s.config.AddrAry))
	for _, addr := range s.config.AddrAry {
		go s.listenAddr(addr, &wg)
	}
	wg.Wait()
}

//listenAddr 开启监听host
func (s *UdpService) listenAddr(addrConfig *AddrConfig, wg *sync.WaitGroup) {
	s.CallLogHandle(netInterface.Info, "[开启] 服务监听 [", s.config.Network, "]地址[", addrConfig.Addr, "]")
	//2 监听服务器地址
	udpAddr, err := net.ResolveUDPAddr(s.config.Network, addrConfig.Addr)
	if err != nil {
		wg.Done()
		s.CallLogHandle(netInterface.Fatal, "地址转换[", s.config.Network, "][", addrConfig.Addr, "]错误:", err)
		return
	}
	uConn, err := net.ListenUDP(s.config.Network, udpAddr)
	if err != nil {
		wg.Done()
		s.CallLogHandle(netInterface.Fatal, "监听[", s.config.Network, "][", addrConfig.Addr, "]错误:", err)
		panic(fmt.Sprint("监听[", s.config.Network, "][", addrConfig.Addr, "]错误:", err))
		return
	}
	s.listenMap.Store(uConn.LocalAddr().String(), uConn)
	//已经监听成功
	s.CallLogHandle(netInterface.Info, "服务开启成功 [", s.config.Network, "]地址[", addrConfig.Addr, "]正在监听中...")
	wg.Done()
	//3 启动server网络连接业务
	s.startReader(uConn)
}

// startReader处理conn读数据的Goroutine
func (s *UdpService) startReader(uConn *net.UDPConn) {
	defer func() {
		uConn.Close()
		s.listenMap.Delete(uConn.LocalAddr().String())
		if r := recover(); r != nil {
			s.CallLogHandle(netInterface.Error, "[udp]连接读取数据异常:", r)
		}
	}()
	tempDelay := time.Millisecond * 0
	maxDelay := 1 * time.Second
	for {
		/*
		*Udp在分配的缓存不足时表现
		*windows 系统直接报异常，要不全部不接收，要不全部接收
		*liunx   系统会出现接收不全，只接收到了分配的缓存,其它会丢弃
		 */
		receive := s.receiveUdpTaskPool.Get()
		count, addr, err := uConn.ReadFromUDP(receive.Data)
		if err != nil {
			s.receiveUdpTaskPool.Put(receive)
			if ne, ok := err.(net.Error); ok && (ne.Temporary() || ne.Timeout()) {
				if tempDelay == 0 {
					tempDelay = 5 * time.Microsecond
				} else {
					tempDelay *= 2
				}
				if tempDelay > maxDelay {
					tempDelay = maxDelay
				}
				s.CallLogHandle(netInterface.Error, "[Udp]客户端上传数据临时异常： ", err)
				time.Sleep(tempDelay)
				continue
			}
			if count > 0 {
				s.CallLogHandle(netInterface.Fatal, "设置缓冲区大小[", s.config.BufferSize, "][Udp]致命异常：", err)
				continue
			}
			s.CallLogHandle(netInterface.Fatal, "[Udp]致命异常：", err)
			continue
		}
		receive.Conn = uConn
		receive.RemoteAddr = addr
		receive.Count = count
		receive.OnCompleted = s.handleConn
		receive.ConnId = hashCode(addr.String())  //远程地址hashCode
		if !s.config.OverflowDiscard {
			s.receiveHandler.SendToTaskQueueWait(receive)
		} else {
			err := s.receiveHandler.SendToTaskQueue(receive)
			if err != nil {
				s.CallLogHandle(netInterface.Fatal, "[UDP处理队列缓存池满]", err)
			}
		}
		if tempDelay != 0 {
			tempDelay = time.Millisecond * 0
		}
	}
}

//hashCode 获取字符串hashCode
func hashCode(s string) uint64 {
	h := fnv.New64a()
	h.Write([]byte(s))
	return h.Sum64()
}

// handleConn 处理连接数据
func (s *UdpService) handleConn(receive *receiveUdpTask) {
	defer s.receiveUdpTaskPool.Put(receive) //处理完成回收

	key := fmt.Sprint(receive.Conn.LocalAddr(),"#",receive.RemoteAddr)
	var connection *UdpConnection
	conn, ok := s.connIdAddrMap.Load(key)
	if !ok {
		newConnId := atomic.AddUint64(&s.connId, 1)
		newConnection := NewUdpConnection(s, receive.Conn, newConnId, receive.RemoteAddr)
		newConnection.SetProperty(connKey,key)
		//启动新连接
		newConnection.Start()
		s.connIdAddrMap.Store(key, newConnection)
		connection = newConnection
	} else {
		connection = conn.(*UdpConnection)
	}
	if s.handleStrategy != nil {
		strategyInfo := s.handleStrategy(connection,receive.Data[:receive.Count])
		if strategyInfo == nil || len(strategyInfo.Key) == 0 {
			s.Stop()
			return
		}
		property, err := connection.GetProperty(HKey)
		if err == nil {
			if strategyInfo.Key != property {
				newConnId := atomic.AddUint64(&s.connId, 1)
				newConnection := NewUdpConnection(s, receive.Conn, newConnId, receive.RemoteAddr)
				newConnection.SetProperty(connKey, key)
				newConnection.SetProperty(HKey, strategyInfo.Key)
				if len(strategyInfo.ExtData) != 0 {
					for key, val := range strategyInfo.ExtData {
						newConnection.SetProperty(key, val)
					}
				}
				//启动新连接
				newConnection.Start()
				s.connIdAddrMap.Store(key, newConnection)
				//停止旧连接
				connection.Stop()
				connection = newConnection
			}
		} else {
			connection.SetProperty(HKey, strategyInfo.Key)
			if len(strategyInfo.ExtData) != 0 {
				for key, val := range strategyInfo.ExtData {
					connection.SetProperty(key, val)
				}
			}
		}
	}
	//调用连接的处理数据方法
	connection.onCompleted(receive)
}

//Stop 停止服务
func (s *UdpService) Stop() {
	if !s.isStart {
		return
	}
	s.isStart = false
	//停止监听
	s.listenMap.Range(func(key, value interface{}) bool {
		listen := value.(*net.UDPConn)
		listen.Close()
		s.CallLogHandle(netInterface.Info, "[udp-Stop]监听服务停止:", key)
		s.listenMap.Delete(listen.LocalAddr().String())
		return true
	})
	//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
	s.connMgr.ClearConn()
	//处理工作池
	s.receiveHandler.StopWorkerPool(false)
	//关闭回发工作池
	s.replyHandle.StopWorkerPool(false)
}

//ToMap 获取内部日志
func (s *UdpService) ToMap() map[string]int64 {
	return map[string]int64{
		"ReceiveQueueCount":   s.receiveHandler.GetTotalTaskQueue(),
		"ReceiveHandleCount": s.receiveHandler.GetTotalHandleCount(),
		"ReplyQueueCount":     s.replyHandle.GetTotalTaskQueue(),
		"ReplyHandleCount":   s.replyHandle.GetTotalHandleCount(),
	}
}